public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
/** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}, * or the specified waiting time elapses. * * <p>If the current count is zero then this method returns immediately * with the value {@code true}. * * <p>If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of three things happen: * <ul> * <li>The count reaches zero due to invocations of the * {@link #countDown} method; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * <li>The specified waiting time elapses. * </ul> */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
/** All mechanics via AbstractQueuedSynchronizer subclass */ private final Sync sync;
/** * Synchronization implementation for semaphore. Uses AQS state * to represent permits. Subclassed into fair and nonfair * versions. */ abstract static class Sync extends AbstractQueuedSynchronizer { xx省略一万字xx }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
/** * CAS waitStatus field of a node. */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); }
hasQueuedPredecessors()
1 2 3 4 5 6 7 8 9 10
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
/** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) thrownew IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
publicvoidreset(){ final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
创建新一代屏障:更新屏障闸的state字段并唤醒每个线程(持锁时有效)。
1 2 3 4 5 6 7
privatevoidnextGeneration(){ // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
privateintdowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock;//使用重入锁。 lock.lock();//锁内操作保安全 try { final Generation g = generation;
if (g.broken) thrownew BrokenBarrierException();
if (Thread.interrupted()) { breakBarrier(); thrownew InterruptedException(); } //count为parties即聚集个数(互相等待的个数) //当index减为0,即所有线程都执行了该dowait()方法,都已执行完毕。 int index = --count; if (index == 0) { // tripped boolean ranAction = false; try {//如果有执行命令则运行,并返回方法。 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return0; } finally { if (!ranAction)//执行命令为null,则破除屏障 breakBarrier(); } }
// loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); elseif (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } }
/** * A capability-based lock with three modes for controlling read/write * access. The state of a StampedLock consists of a version and mode. * Lock acquisition methods return a stamp that represents and * controls access with respect to a lock state; "try" versions of * these methods may instead return the special value zero to * represent failure to acquire access. Lock release and conversion * methods require stamps as arguments, and fail if they do not match * the state of the lock. …… * * <li><b>Optimistic Reading.</b> Method {@link #tryOptimisticRead} * returns a non-zero stamp only if the lock is not currently held * in write mode. Method {@link #validate} returns true if the lock * has not been acquired in write mode since obtaining a given * stamp. This mode can be thought of as an extremely weak version * of a read-lock, that can be broken by a writer at any time. The * use of optimistic mode for short read-only code segments often * reduces contention and improves throughput. However, its use is * inherently fragile. Optimistic read sections should only read * fields and hold them in local variables for later use after * validation. Fields read while in optimistic mode may be wildly * inconsistent, so usage applies only when you are familiar enough * with data representations to check consistency and/or repeatedly * invoke method {@code validate()}. For example, such steps are * typically required when first reading an object or array * reference, and then accessing one of its fields, elements or * methods. </li> * * Algorithmic notes: * * The design employs elements of Sequence locks * (as used in linux kernels; see Lameter's * http://www.lameter.com/gelato2005.pdf * and elsewhere; see * Boehm's http://www.hpl.hp.com/techreports/2012/HPL-2012-68.html) * and Ordered RW locks (see Shirako et al * http://dl.acm.org/citation.cfm?id=2312015) * * Conceptually, the primary state of the lock includes a sequence * number that is odd when write-locked and even otherwise. * However, this is offset by a reader count that is non-zero when * read-locked. The read count is ignored when validating * "optimistic" seqlock-reader-style stamps. Because we must use * a small finite number of bits (currently 7) for readers, a * supplementary reader overflow word is used when the number of * readers exceeds the count field. We do this by treating the max * reader count value (RBITS) as a spinlock protecting overflow * updates. * * Waiters use a modified form of CLH lock used in * AbstractQueuedSynchronizer (see its internal documentation for * a fuller account), where each node is tagged (field mode) as * either a reader or writer. Sets of waiting readers are grouped * (linked) under a common node (field cowait) so act as a single * node with respect to most CLH mechanics. By virtue of the * queue structure, wait nodes need not actually carry sequence * numbers; we know each is greater than its predecessor. This * simplifies the scheduling policy to a mainly-FIFO scheme that * incorporates elements of Phase-Fair locks (see Brandenburg & * Anderson, especially http://www.cs.unc.edu/~bbb/diss/). In * particular, we use the phase-fair anti-barging rule: If an * incoming reader arrives while read lock is held but there is a * queued writer, this incoming reader is queued. (This rule is * responsible for some of the complexity of method acquireRead, * but without it, the lock becomes highly unfair.) Method release * does not (and sometimes cannot) itself wake up cowaiters. This * is done by the primary thread, but helped by any other threads * with nothing better to do in methods acquireRead and * acquireWrite. * * These rules apply to threads actually queued. All tryLock forms * opportunistically try to acquire locks regardless of preference * rules, and so may "barge" their way in. Randomized spinning is * used in the acquire methods to reduce (increasingly expensive) * context switching while also avoiding sustained memory * thrashing among many threads. We limit spins to the head of * queue. A thread spin-waits up to SPINS times (where each * iteration decreases spin count with 50% probability) before * blocking. If, upon wakening it fails to obtain lock, and is * still (or becomes) the first waiting thread (which indicates * that some other thread barged and obtained lock), it escalates * spins (up to MAX_HEAD_SPINS) to reduce the likelihood of * continually losing to barging threads. * * Nearly all of these mechanics are carried out in methods * acquireWrite and acquireRead, that, as typical of such code, * sprawl out because actions and retries rely on consistent sets * of locally cached reads. * * As noted in Boehm's paper (above), sequence validation (mainly * method validate()) requires stricter ordering rules than apply * to normal volatile reads (of "state"). To force orderings of * reads before a validation and the validation itself in those * cases where this is not already forced, we use * Unsafe.loadFence. * * The memory layout keeps lock state and queue pointers together * (normally on the same cache line). This usually works well for * read-mostly loads. In most other cases, the natural tendency of * adaptive-spin CLH locks to reduce memory contention lessens * motivation to further spread out contended locations, but might * be subject to future improvements. */
public ReentrantLock() { sync = new NonfairSync(); }
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L;
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
/** * Sets the thread that currently owns exclusive access. * A {@code null} argument indicates that no thread owns access. * This method does not otherwise impose any synchronization or * {@code volatile} field accesses. * @param thread the owner thread */ protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
/** * Convenience method to interrupt current thread. */ static void selfInterrupt() { Thread.currentThread().interrupt(); }
public void interrupt() { if (this != Thread.currentThread()) checkAccess();
synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); return; } } interrupt0(); }